#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <set>
#include <vector>
#include <unordered_set>

namespace bitrpc
{
    namespace server
    {
        // 发布订阅管理器
        class TopicManager
        {
        public:
            using ptr = std::shared_ptr<TopicManager>;
            TopicManager(){}
            //
            void onTopicRequest(const BaseConnection::ptr& conn,const TopicRequest::ptr& msg)
            {
                TopicOpType topic_optype = msg->optype();
                bool ret = true;
                switch(topic_optype)
                {
                    //主题的创建
                    case TopicOpType::TOPIC_CREATE: topicCreate(conn,msg);break;
                    //主题的删除
                    case TopicOpType::TOPIC_REMOVE: topicRemove(conn,msg);break;
                    //主题的订阅
                    case TopicOpType::TOPIC_SUBSCRIBE: ret = topicSubscribe(conn,msg);break;
                    //主题的取消订阅
                    case TopicOpType::TOPIC_CANCEL: topicCancel(conn,msg);break;
                    //主题消息的发布
                    case TopicOpType::TOPIC_PUBLISH: ret = topicPublish(conn,msg);break;
                    default: return errorResponse(conn,msg,RCode::RCODE_INVALID_OPTYPE);//无效的操作类型
                } 
                if(!ret) return errorResponse(conn,msg,RCode::RCODE_NOT_FOUND_TOPIC);
                return topicResponse(conn,msg);   
            }
            //一个订阅者在连接断开时的处理,删除其关联的数据
            void onShutdown(const BaseConnection::ptr& conn)
            {
                //消息发布者断开连接，不需要任何操作；  消息订阅者断开连接需要删除管理数据
                //1. 判断断开连接的是否是订阅者，不是的话则直接返回
                std::vector<Topic::ptr> topics;
                Subscriber::ptr subscriber;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _subscribers.find(conn);//查找订阅者
                    if(it == _subscribers.end()) 
                    {
                        return;//不是订阅者，直接返回
                    }
                    subscriber = it->second;//获取到订阅者
                    //2. 获取到订阅者退出，受影响的主题对象
                    for(auto &topic_name : subscriber->topics)//遍历订阅者订阅的主题
                    {
                        auto topic_it = _topics.find(topic_name);
                        if(topic_it == _topics.end())
                        {
                            continue;
                        }
                        topics.push_back(topic_it->second);
                    }
                    //4. 从订阅者映射信息中，删除订阅者
                    _subscribers.erase(it);
                }
                 //3. 从受影响的主题对象中，移除订阅者
                 for(auto &topic : topics)
                 {
                     topic->removeSubscriber(subscriber);
                 }
            }

        private:
        //请求出错的响应
        void errorResponse(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg,RCode rcode)
            {
                auto msg_rsp = MessageFactory::create<TopicResponse>();
                msg_rsp->setId(msg->rid());
                msg_rsp->setMType(MType::RSP_TOPIC);
                msg_rsp->setRCode(rcode);   
                conn->send(msg_rsp);
            }
            //请求成功的响应
            void topicResponse(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
            {
                auto msg_rsp = MessageFactory::create<TopicResponse>();
                msg_rsp->setId(msg->rid());
                msg_rsp->setMType(MType::RSP_TOPIC);
                msg_rsp->setRCode(RCode::RCODE_OK);
                conn->send(msg_rsp);
            }


        //主题的创建
        void topicCreate(const BaseConnection::ptr &conn,const TopicRequest::ptr &msg)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            //构造一个主题的对象,添加映射关系的管理
            std::string topic_name = msg->topicKey();//获取主题名称
            auto topic = std::make_shared<Topic>(topic_name);
            _topics.insert(std::make_pair(topic_name,topic));//将主题添加到管理器中
        }

        //主题的删除
        void topicRemove(const BaseConnection::ptr &conn,const TopicRequest::ptr &msg)
        {
            // 1. 查看当前主题，有哪些订阅者，然后从订阅者中将主题信息删除掉
            // 2. 删除主题的数据 -- 主题名称与主题对象的映射关系
            std::string topic_name = msg->topicKey();
            std::unordered_set<Subscriber::ptr> subscribers;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                //在删除主题之前,先找出会受影响的订阅者        
                auto it = _topics.find(topic_name);
                if(it == _topics.end())
                {
                    return;
                }
                //获取当前主题的订阅者
                subscribers = it->second->subscribers;
                _topics.erase(it);//删除主题
            }
            for(auto &subscriber : subscribers)
            {
                subscriber->removeTopic(topic_name);//将订阅者中的主题信息删除
            }
        }

        //主题的订阅
        bool topicSubscribe(const BaseConnection::ptr &conn,const TopicRequest::ptr &msg)
        {
            //1. 先找出主题对象，以及订阅者对象
            //   如果没有找到主题--就要报错；  但是如果没有找到订阅者对象，那就要构造一个订阅者
            Topic::ptr topic;
            Subscriber::ptr subscriber;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto topic_it = _topics.find(msg->topicKey());//msg->topicKey():获取主题名称
                if(topic_it == _topics.end())
                {
                    return false;
                }
                topic = topic_it->second;//获取主题对象
                auto sub_it = _subscribers.find(conn);
                if(sub_it != _subscribers.end())
                {
                    subscriber = sub_it->second;//获取订阅者对象
                }
                else
                {
                    subscriber = std::make_shared<Subscriber>(conn);//构造一个订阅者对象
                    _subscribers.insert(std::make_pair(conn,subscriber));//将订阅者对象添加到订阅者集合中
                } 
            }
            //2. 在主题对象中，新增一个订阅者对象关联的连接；  在订阅者对象中新增一个订阅的主题
            topic->appendSubscriber(subscriber);
            subscriber->appendTopic(msg->topicKey());

            return true;
        }

        //主题的取消订阅
        void topicCancel(const BaseConnection::ptr &conn,const TopicRequest::ptr &msg)
        {
            //1. 先找出主题对象，以及订阅者对象
            Topic::ptr topic;
            Subscriber::ptr subscriber;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto topic_it = _topics.find(msg->topicKey());//msg->topicKey():获取主题名称
                if(topic_it != _topics.end())
                {
                    topic = topic_it->second;//获取主题对象
                }
                auto sub_it = _subscribers.find(conn);
                if(sub_it != _subscribers.end())
                {
                    subscriber = sub_it->second;//获取订阅者对象
                }
                
            }
            //2. 从主题对象中删除当前的订阅者连接；   从订阅者信息中删除所订阅的主题名称
            if(subscriber) subscriber->removeTopic(msg->topicKey());
            if(topic && subscriber) topic->removeSubscriber(subscriber);
            
        }

        //主题的发布消息
        bool topicPublish(const BaseConnection::ptr &conn,const TopicRequest::ptr &msg)
        {
            Topic::ptr topic;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto topic_it = _topics.find(msg->topicKey());//msg->topicKey():获取主题名称
                if(topic_it == _topics.end())
                {
                    return false;
                }
                topic = topic_it->second;//获取主题对象
            }
            topic->pushMessage(msg);//将消息推送给订阅者
            return true;
        }

        private:
        //订阅者
        struct Subscriber 
        {
            using ptr = std::shared_ptr<Subscriber>;
            std::mutex _mutex;
            BaseConnection::ptr conn;
            std::unordered_set<std::string> topics;//订阅者所订阅的主题名称

            Subscriber(const BaseConnection::ptr& c)
                :conn(c)
            {}
            //订阅者添加订阅主题
            void appendTopic(const std::string &topic_name)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                topics.insert(topic_name);
            }
            //订阅者移除订阅主题或者主题被删除
            void removeTopic(const std::string &topic_name)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                topics.erase(topic_name);
            }
        };
        struct Topic
        {
            using ptr = std::shared_ptr<Topic>;
            std::string topic_name;
            std::mutex _mutex;
            std::unordered_set<Subscriber::ptr> subscribers;//当前主题的订阅者

            Topic(const std::string& topic_name)
                :topic_name(topic_name)
            {}
            //添加订阅者:新增订阅
            void appendSubscriber(const Subscriber::ptr& subscriber)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                subscribers.insert(subscriber);
            }
            //移除订阅者:取消订阅或者订阅者连接断开
            void removeSubscriber(const Subscriber::ptr& subscriber)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                subscribers.erase(subscriber);
            }
            //订阅者推送消息:收到消息发布请求
            void pushMessage(const BaseMessage::ptr& msg)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                for(auto &subscriber : subscribers)
                {
                    subscriber->conn->send(msg);
                }
            }
        };
        
        private:
        std::mutex _mutex;
        std::unordered_map<std::string,Topic::ptr> _topics;//主题名称与主题的映射
        std::unordered_map<BaseConnection::ptr,Subscriber::ptr> _subscribers;//连接与订阅者的映射
        };
    }
}